package com.dk.foundation.common;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import com.dk.foundation.common.formatter.Formatter;
import com.dk.foundation.common.formatter.MessageFormatter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class KafkaAppender extends AppenderBase<ILoggingEvent> {
  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAppender.class);
  private Formatter formatter = new MessageFormatter();
  private boolean logToSystemOut = false;
  private String topic;
  private String brokers;
  private KafkaProducer producer;
  private String timeout;

  private boolean syncSend=true;

  @Override
  public void start() {
    super.start();
    LOGGER.info("Starting KafkaAppender...");
    final Properties props = new Properties();
    try {
      props.put("bootstrap.servers", brokers);
      props.put("request.timeout.ms",  timeout);
      props.put("max.block.ms",  timeout);
      props.put("network.request.timeout.ms",  timeout);

      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

      producer = new KafkaProducer<>(props);
    } catch (Exception exception) {
      System.out.println("KafkaAppender: Exception initializing Producer. "+ exception +" : "+ exception.getMessage());
      exception.printStackTrace();
      throw new RuntimeException("KafkaAppender: Exception initializing Producer.",exception);
    }
    System.out.println("KafkaAppender: Producer initialized: "+ producer);
    if (topic == null) {
      LOGGER.error("KafkaAppender requires a topic. Add this to the appender configuration.");
      System.out.println("KafkaAppender requires a topic. Add this to the appender configuration.");
    } else {
      LOGGER.info("KafkaAppender will publish messages to the '{}' topic.",topic);
      System.out.println("KafkaAppender will publish messages to the '" + topic + "' topic.");
    }

    LOGGER.info("Kafka Producer Properties = {}", props);
    if (logToSystemOut) {
      System.out.println("KafkaAppender: properties = '" + props + "'.");
    }
  }

  @Override
  public void stop() {
    super.stop();
    LOGGER.info("Stopping KafkaAppender...");
    producer.close();
  }

  @Override
  protected void append(ILoggingEvent event) {
    String string = this.formatter.format(event);
    if (logToSystemOut) {
      System.out.println("KafkaAppender: Appending string: '" + string + "'.");
    }
    try {
      ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, string);
      producer.send(producerRecord);
    }catch(Exception e){
      System.out.println("KafkaAppender: Exception sending message: '" + e + " : "+ e.getMessage() + "'.");
      e.printStackTrace();
    }
  }

  public Formatter getFormatter() {
    return formatter;
  }

  public void setFormatter(Formatter formatter) {
    this.formatter = formatter;
  }

  public String getLogToSystemOut() {
    return logToSystemOut + "";
  }

  public void setLogToSystemOut(String logToSystemOutString) {
    if ("true".equalsIgnoreCase(logToSystemOutString)) {
      this.logToSystemOut = true;
    }
  }

  public String getTopic() {
    return topic;
  }

  public void setTopic(String topic) {
    this.topic = topic;
  }

  public String getBrokers() {
    return brokers;
  }

  public void setBrokers(String brokers) {
    this.brokers = brokers;
  }

  public String getTimeout() {
    return timeout;
  }

  public void setTimeout(String timeout) {
    this.timeout = timeout;
  }

  public boolean isSyncSend() {
    return syncSend;
  }

  public void setSyncSend(boolean syncSend) {
    this.syncSend = syncSend;
  }
}